{
  "nbformat": 4,
  "nbformat_minor": 0,
  "metadata": {
    "colab": {
      "name": "tfp_anomaly_detection.ipynb",
      "provenance": [],
      "collapsed_sections": []
    },
    "kernelspec": {
      "display_name": "Python 3",
      "name": "python3"
    },
    "language_info": {
      "name": "python"
    }
  },
  "cells": [
    {
      "cell_type": "code",
      "metadata": {
        "id": "iXDk02sFYgJy"
      },
      "source": [
        "# Copyright 2021 Google LLC\n",
        "#\n",
        "# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
        "# you may not use this file except in compliance with the License.\n",
        "# You may obtain a copy of the License at\n",
        "#\n",
        "#     https://www.apache.org/licenses/LICENSE-2.0\n",
        "#\n",
        "# Unless required by applicable law or agreed to in writing, software\n",
        "# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
        "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
        "# See the License for the specific language governing permissions and\n",
        "# limitations under the License."
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "QZo6Stv1Z2Nf"
      },
      "source": [
        "<table align=\"left\">\n",
        "\n",
        "  <td>\n",
        "    <a href=\"https://colab.research.google.com/github/kubeflow/pipelines/blob/master/components/google-cloud/google_cloud_pipeline_components/experimental/tensorflow_probability/anomaly_detection/tfp_anomaly_detection.ipynb\"\">\n",
        "      <img src=\"https://cloud.google.com/ml-engine/images/colab-logo-32px.png\" alt=\"Colab logo\"> Run in Colab\n",
        "    </a>\n",
        "  </td>\n",
        "  <td>\n",
        "    <a href=\"https://github.com/kubeflow/pipelines/blob/master/components/google-cloud/google_cloud_pipeline_components/experimental/tensorflow_probability/anomaly_detection/tfp_anomaly_detection.ipynb\">\n",
        "      <img src=\"https://cloud.google.com/ml-engine/images/github-logo-32px.png\" alt=\"GitHub logo\">\n",
        "      View on GitHub\n",
        "    </a>\n",
        "  </td>\n",
        "</table>"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "YRvy0UhwcjOm"
      },
      "source": [
        "# Anomaly Detection with TensorFlow Probability STS on Kubeflow Pipelines"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "0oCL7T67aa02"
      },
      "source": [
        "## Overview\n",
        "\n",
        "This notebook demonstrates how to use [TensorFlow Probability](https://www.tensorflow.org/probability) and [Kubeflow Pipelines](https://www.kubeflow.org/docs/components/pipelines/) for anomaly detection in time series data. It uses structural time series (STS), a class of Bayesian statistical models, to decompose a time series into interpretable seasonal and trend components. This algorithm fits an STS model to the time series, generates a forecast of acceptable values for each timestep, and flags any points outside of the forecast as an anomaly. To learn more about STS models, check out this demo on [Structural Time Series Modeling Case Studies](https://www.tensorflow.org/probability/examples/Structural_Time_Series_Modeling_Case_Studies_Atmospheric_CO2_and_Electricity_Demand).\n",
        "\n",
        "This demo is most relevant for those who would like to automatically flag anomalies in time series data and can be used for applications like network monitoring, infrastructure maintenance, and sales tracking.\n",
        "\n",
        "### Dataset\n",
        "\n",
        "{TODO: Replace synthetic data with benchmark}\n",
        "\n",
        "This demo uses the [Numenta Anomaly Benchmark](https://github.com/numenta/NAB), a popular benchmark of time series data with labelled anomalies. More specifically, our demo uses [nyc_taxi.csv](https://github.com/numenta/NAB/blob/d2854d17a3feb9e143b1e9a715c5af67da2c1888/data/realKnownCause/nyc_taxi.csv) which reports the number of NYC taxi passengers over time.\n",
        "\n",
        "### Objective\n",
        "\n",
        "In this notebook, you will learn how to:\n",
        "\n",
        "* Infer the frequency of a time series and regularize it with `tfp.sts.regularize_series`\n",
        "* Infer the the structure of the time series using `tensorflow_probability.python.sts.default_model.build_default_model`\n",
        "* Fit the model with variational inference using `tfp.vi.fit_surrogate_posterior`\n",
        "* Identify anomalies based on the predictive distribution of acceptable values at each timestep\n",
        "* Launch the anomaly detection algorithm on Kubeflow Pipelines\n",
        "\n",
        "### Costs\n",
        "This tutorial uses billable components of Google Cloud:\n",
        "\n",
        "* Vertex AI\n",
        "* Cloud Storage\n",
        "\n",
        "Learn about [Vertex AI\n",
        "pricing](https://cloud.google.com/vertex-ai/pricing) and [Cloud Storage\n",
        "pricing](https://cloud.google.com/storage/pricing), and use the [Pricing\n",
        "Calculator](https://cloud.google.com/products/calculator/)\n",
        "to generate a cost estimate based on your projected usage."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "UsKeVhtHbYeU"
      },
      "source": [
        "### Set up your local development environment\n",
        "\n",
        "**If you are using Colab or Google Cloud Notebooks**, your environment already meets\n",
        "all the requirements to run this notebook. You can skip this step.\n",
        "\n",
        "**Otherwise**, make sure your environment meets this notebook's requirements.\n",
        "You need the following:\n",
        "\n",
        "* The Google Cloud SDK\n",
        "* Git\n",
        "* Python 3\n",
        "* virtualenv\n",
        "* Jupyter notebook running in a virtual environment with Python 3\n",
        "\n",
        "The Google Cloud guide to [Setting up a Python development\n",
        "environment](https://cloud.google.com/python/setup) and the [Jupyter\n",
        "installation guide](https://jupyter.org/install) provide detailed instructions\n",
        "for meeting these requirements. The following steps provide a condensed set of\n",
        "instructions:\n",
        "\n",
        "1. [Install and initialize the Cloud SDK.](https://cloud.google.com/sdk/docs/)\n",
        "\n",
        "1. [Install Python 3.](https://cloud.google.com/python/setup#installing_python)\n",
        "\n",
        "1. [Install\n",
        "   virtualenv](https://cloud.google.com/python/setup#installing_and_using_virtualenv)\n",
        "   and create a virtual environment that uses Python 3. Activate the virtual environment.\n",
        "\n",
        "1. To install Jupyter, run `pip3 install jupyter` on the\n",
        "command-line in a terminal shell.\n",
        "\n",
        "1. To launch Jupyter, run `jupyter notebook` on the command-line in a terminal shell.\n",
        "\n",
        "1. Open this notebook in the Jupyter Notebook Dashboard."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "i7EUnXsZhAGF"
      },
      "source": [
        "### Install additional packages\n",
        "\n",
        "Install additional package dependencies not installed in your notebook environment."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "2b4ef9b72d43"
      },
      "source": [
        "import os\n",
        "\n",
        "# The Google Cloud Notebook product has specific requirements\n",
        "IS_GOOGLE_CLOUD_NOTEBOOK = os.path.exists(\"/opt/deeplearning/metadata/env_version\")\n",
        "\n",
        "# Google Cloud Notebook requires dependencies to be installed with '--user'\n",
        "USER_FLAG = \"\"\n",
        "if IS_GOOGLE_CLOUD_NOTEBOOK:\n",
        "    USER_FLAG = \"--user\""
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "8EO5Ar64fidZ"
      },
      "source": [
        "! pip3 install {USER_FLAG} --upgrade kfp"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "wIibh1qxkc_G"
      },
      "source": [
        "### Restart the kernel\n",
        "\n",
        "After you install the additional packages, you need to restart the notebook kernel so it can find the packages."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "zGbkBMWPPvPY"
      },
      "source": [
        "# Automatically restart kernel after installs\n",
        "import os\n",
        "\n",
        "if not os.getenv(\"IS_TESTING\"):\n",
        "    # Automatically restart kernel after installs\n",
        "    import IPython\n",
        "\n",
        "    app = IPython.Application.instance()\n",
        "    app.kernel.do_shutdown(True)"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "No37uLsjk1UN"
      },
      "source": [
        "## Before you begin\n",
        "\n",
        "### Set up your Google Cloud project\n",
        "\n",
        "**The following steps are required, regardless of your notebook environment.**\n",
        "\n",
        "1. [Select or create a Google Cloud project](https://console.cloud.google.com/cloud-resource-manager). When you first create an account, you get a $300 free credit towards your compute/storage costs.\n",
        "\n",
        "1. [Make sure that billing is enabled for your project](https://cloud.google.com/billing/docs/how-to/modify-project).\n",
        "\n",
        "1. [Enable the Vertex AI API, Cloud Build API, Cloud Storage API, and Container Registry API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com,cloudbuild.googleapis.com,storage.googleapis.com,containerregistry.googleapis.com).\n",
        "\n",
        "1. If you are running this notebook locally, you will need to install the [Cloud SDK](https://cloud.google.com/sdk).\n",
        "\n",
        "1. Enter your project ID in the cell below. Then run the cell to make sure the\n",
        "Cloud SDK uses the right project for all the commands in this notebook.\n",
        "\n",
        "**Note**: Jupyter runs lines prefixed with `!` as shell commands, and it interpolates Python variables prefixed with `$` into these commands."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "ff394ff4"
      },
      "source": [
        "#### Set your project ID\n",
        "\n",
        "**If you don't know your project ID**, you may be able to get your project ID using `gcloud`."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "collapsed": true,
        "id": "1988b81c",
        "jupyter": {
          "outputs_hidden": true
        },
        "outputId": "6a6488f6-6883-4246-a197-b3c5816dab0d"
      },
      "source": [
        "import os\n",
        "\n",
        "# Get your Google Cloud project ID from gcloud\n",
        "if not os.getenv(\"IS_TESTING\"):\n",
        "  shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null\n",
        "  PROJECT_ID = shell_output[0]\n",
        "  print(\"Project ID: \", PROJECT_ID)"
      ],
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "Project ID:  \n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "5adca8fc"
      },
      "source": [
        "Otherwise, set your project ID here."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "collapsed": true,
        "id": "939b4566",
        "jupyter": {
          "outputs_hidden": true
        }
      },
      "source": [
        "if PROJECT_ID == \"\" or PROJECT_ID is None:\n",
        "  PROJECT_ID = \"[your-project-id]\"  # @param {type:\"string\"}"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "ypoM_48NJCuU",
        "outputId": "82b0e9ed-0728-4ce8-dcb8-8acfac3ca6e5"
      },
      "source": [
        "!gcloud config set project {PROJECT_ID}"
      ],
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "Updated property [core/project].\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "2df8c231"
      },
      "source": [
        "#### Timestamp\n",
        "\n",
        "If you are in a live tutorial session, you might be using a shared test account or project. To avoid name collisions between users on resources created, you create a timestamp for each instance session, and append it onto the name of resources you create in this tutorial."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "collapsed": true,
        "id": "84ba4ab1",
        "jupyter": {
          "outputs_hidden": true
        }
      },
      "source": [
        "from datetime import datetime\n",
        "\n",
        "TIMESTAMP = datetime.now().strftime(\"%Y%m%d%H%M%S\")"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "dr--iN2kAylZ"
      },
      "source": [
        "### Authenticate your Google Cloud account\n",
        "\n",
        "**If you are using Google Cloud Notebooks**, your environment is already\n",
        "authenticated. Skip this step."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "sBCra4QMA2wR"
      },
      "source": [
        "**If you are using Colab**, run the cell below and follow the instructions\n",
        "when prompted to authenticate your account via oAuth.\n",
        "\n",
        "**Otherwise**, follow these steps:\n",
        "\n",
        "1. In the Cloud Console, go to the [**Create service account key**\n",
        "   page](https://console.cloud.google.com/apis/credentials/serviceaccountkey).\n",
        "\n",
        "2. Click **Create service account**.\n",
        "\n",
        "3. In the **Service account name** field, enter a name, and\n",
        "   click **Create**.\n",
        "\n",
        "4. In the **Grant this service account access to project** section, click the **Role** drop-down list. Type \"Vertex AI\"\n",
        "into the filter box, and select\n",
        "   **Vertex AI Administrator**. Type \"Storage Object Admin\" into the filter box, and select **Storage Object Admin**.\n",
        "\n",
        "5. Click *Create*. A JSON file that contains your key downloads to your\n",
        "local environment.\n",
        "\n",
        "6. Enter the path to your service account key as the\n",
        "`GOOGLE_APPLICATION_CREDENTIALS` variable in the cell below and run the cell."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "PyQmSRbKA8r-"
      },
      "source": [
        "import os\n",
        "import sys\n",
        "\n",
        "# If you are running this notebook in Colab, run this cell and follow the\n",
        "# instructions to authenticate your GCP account. This provides access to your\n",
        "# Cloud Storage bucket and lets you submit training jobs and prediction\n",
        "# requests.\n",
        "\n",
        "# The Google Cloud Notebook product has specific requirements\n",
        "IS_GOOGLE_CLOUD_NOTEBOOK = os.path.exists(\"/opt/deeplearning/metadata/env_version\")\n",
        "\n",
        "# If on Google Cloud Notebooks, then don't execute this code\n",
        "if not IS_GOOGLE_CLOUD_NOTEBOOK:\n",
        "    if \"google.colab\" in sys.modules:\n",
        "        from google.colab import auth as google_auth\n",
        "\n",
        "        google_auth.authenticate_user()\n",
        "\n",
        "    # If you are running this notebook locally, replace the string below with the\n",
        "    # path to your service account key and run this cell to authenticate your GCP\n",
        "    # account.\n",
        "    elif not os.getenv(\"IS_TESTING\"):\n",
        "        %env GOOGLE_APPLICATION_CREDENTIALS ''"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "zgPO1eR3CYjk"
      },
      "source": [
        "### Create a Cloud Storage bucket\n",
        "\n",
        "**The following steps are required, regardless of your notebook environment.**\n",
        "\n",
        "When you submit a training job, Vertex AI saves all resources to the given GCS bucket.\n",
        "\n",
        "We will also use the same bucket to download and host the input data \n",
        "\n",
        "Set the name of your Cloud Storage bucket below. It must be unique across all\n",
        "Cloud Storage buckets.\n",
        "\n",
        "You may also change the `REGION` variable, which is used for operations\n",
        "throughout the rest of this notebook. Make sure to [choose a region where Vertex AI services are\n",
        "available](https://cloud.google.com/vertex-ai/docs/general/locations#available_regions). You may not use a Multi-Regional Storage bucket for training with Vertex AI."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "MzGDU7TWdts_"
      },
      "source": [
        "BUCKET_NAME = \"gs://[your-bucket-name]\"  # @param {type:\"string\"}\n",
        "REGION = \"[your-region]\"  # @param {type:\"string\"}"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "cf221059d072"
      },
      "source": [
        "if BUCKET_NAME == \"\" or BUCKET_NAME is None or BUCKET_NAME == \"gs://[your-bucket-name]\":\n",
        "    BUCKET_NAME = \"gs://\" + PROJECT_ID + \"aip-\" + TIMESTAMP"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "-EcIXiGsCePi"
      },
      "source": [
        "**Only if your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "NIq7R4HZCfIc"
      },
      "source": [
        "! gsutil mb -l $REGION $BUCKET_NAME"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "ucvCsknMCims"
      },
      "source": [
        "Finally, validate access to your Cloud Storage bucket by examining its contents:"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "vhOb7YnwClBb"
      },
      "source": [
        "! gsutil ls -al $BUCKET_NAME"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "h01b41bGpu6l"
      },
      "source": [
        "### Import libraries and define constants"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "4NUcY9w0pidh"
      },
      "source": [
        "PIPELINE_NAME = '{0}-{1}'.format('tfp-anomaly-detection', TIMESTAMP)\n",
        "PIPELINE_ROOT = '{0}/{1}'.format(BUCKET_NAME, PIPELINE_NAME)"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "6MsrEnjUJMDB"
      },
      "source": [
        "from typing import NamedTuple, OrderedDict, Callable, Optional, Mapping, Any\n",
        "\n",
        "import kfp\n",
        "from kfp.v2 import compiler\n",
        "from kfp.v2 import dsl\n",
        "from kfp.v2.google.client import AIPlatformClient"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "8aSrF6n1noLh"
      },
      "source": [
        "### Define the anomaly detection component"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "ZeMp71LgJYMF"
      },
      "source": [
        "def train(\n",
        "    num_samples: int = 50,\n",
        "    max_num_steps: int = 1000,\n",
        "    jit_compile: bool = True,\n",
        "    seed: int = None,\n",
        "    anomaly_threshold: float = 0.01\n",
        ") -> NamedTuple('PredictionOutput',\n",
        "                [('time', list), ('observed_series', list), ('all_times', list),\n",
        "                 ('mean', list), ('upper_limit', list), ('lower_limit', list),\n",
        "                 ('anomalies', list), ('pvalues', list)]):\n",
        "  \"\"\"Uses TFP STS to regularize a time series, fit a model, and predict anomalies.\n",
        "\n",
        "  Args:\n",
        "    num_samples: Number of data points to sample from the posterior\n",
        "      distribution.\n",
        "    max_num_steps: Number of steps to run the optimizer.\n",
        "    jit_compile: If True, compiles the loss function and gradient update using\n",
        "      XLA.\n",
        "    seed: The random seed to use for sampling.\n",
        "    anomaly_threshold: Any data point with a pvalue lower than this threshold is\n",
        "      labelled anomalous.\n",
        "\n",
        "  Returns:\n",
        "    time: Timestamps where each index aligns with the indices of the other\n",
        "    outputs.\n",
        "    all_times: Same as time for anomaly detection but contains more timestamps\n",
        "    for forecasting.\n",
        "    anomalies: Indices of timestamps with anomalies.\n",
        "    lower_limit: Lowest forecast value computed when fitting the model.\n",
        "    mean: Mean of the distribution computed when fitting the model.\n",
        "    upper_limit: Highest forecast value computed when fitting the model.\n",
        "    observed_series: The time series that was originally input.\n",
        "    pvalues: For each point, the tail-area probability of the data point.\n",
        "  \"\"\"\n",
        "\n",
        "  import itertools\n",
        "  import numpy as np\n",
        "  import pandas as pd\n",
        "  import tensorflow.compat.v2 as tf\n",
        "  import tensorflow_probability as tfp\n",
        "  from tensorflow_probability.python.sts import default_model\n",
        "  from tensorflow_probability.python.sts.internal import util as sts_util\n",
        "  from tensorflow_probability.python.sts import one_step_predictive\n",
        "\n",
        "  def load_data() -> pd.Series:\n",
        "    \"\"\"Generates synthetic data with anomalies.\n",
        "\n",
        "    Returns:\n",
        "      Pandas dataframe with synthetic data.\n",
        "    \"\"\"\n",
        "    times = pd.date_range(\n",
        "        start='2020-01-05 00:00:00',\n",
        "        end='2020-01-12 00:00:00',\n",
        "        freq=pd.DateOffset(hours=1))\n",
        "\n",
        "    xs = np.arange(len(times))\n",
        "    ys = 8 * np.sin(1. / 24. * 2 * np.pi * xs)\n",
        "    ys += 4 * np.cos(1. / 6 * 2 * np.pi * xs)\n",
        "    ys += 0.001 * (xs - 70)**2\n",
        "    ys += 3 * np.random.randn(len(ys))\n",
        "    ys[137] += 16.0\n",
        "    ys[138] += 14.0\n",
        "\n",
        "    return pd.DataFrame(data={'value': ys}, index=times)\n",
        "\n",
        "  def detect_anomalies(\n",
        "      regularized_series: pd.Series, model: tfp.sts.StructuralTimeSeries,\n",
        "      posterior_samples: OrderedDict[str, tf.Tensor], anomaly_threshold: float\n",
        "  ) -> NamedTuple('PredictionOutput', [('time', list), (\n",
        "      'observed_series', list), ('all_times', list), (\n",
        "          'mean', list), ('upper_limit', list), ('lower_limit', list),\n",
        "                                       ('anomalies', list), ('pvalues', list)]):\n",
        "    \"\"\"Given a model, posterior, and anomaly_threshold, identifies anomalous time points in a time series.\n",
        "\n",
        "    Args:\n",
        "      regularized_series: A time series with a regular frequency.\n",
        "      model: A fitted model that approximates the time series.\n",
        "      posterior_samples: Posterior samples of model parameters.\n",
        "      anomaly_threshold: The anomaly threshold passed as a parameter in the\n",
        "        outer function.\n",
        "\n",
        "    Returns:\n",
        "      PredictionOutput with the distribution of acceptable time series values\n",
        "      and detected anomalies.\n",
        "    \"\"\"\n",
        "    [observed_time_series, mask\n",
        "    ] = sts_util.canonicalize_observed_time_series_with_mask(regularized_series)\n",
        "\n",
        "    anomaly_threshold = tf.convert_to_tensor(\n",
        "        anomaly_threshold, dtype=observed_time_series.dtype)\n",
        "\n",
        "    # The one-step predictive distribution covers the final `T - 1` timesteps.\n",
        "    predictive_dist = one_step_predictive(\n",
        "        model,\n",
        "        regularized_series[:-1],\n",
        "        posterior_samples,\n",
        "        timesteps_are_event_shape=False)\n",
        "    observed_series = observed_time_series[..., 1:, 0]\n",
        "    times = regularized_series.index[1:]\n",
        "\n",
        "    # Compute the tail probabilities (pvalues) of the observed series.\n",
        "    prob_lower = predictive_dist.cdf(observed_series)\n",
        "    tail_probabilities = 2 * tf.minimum(prob_lower, 1 - prob_lower)\n",
        "\n",
        "    # Since quantiles of a mixture distribution are not analytically available,\n",
        "    # use scalar root search to compute the upper and lower bounds.\n",
        "    predictive_mean = predictive_dist.mean()\n",
        "    predictive_stddev = predictive_dist.stddev()\n",
        "    target_log_cdfs = tf.stack(\n",
        "        [\n",
        "            tf.math.log(anomaly_threshold / 2.) *\n",
        "            tf.ones_like(predictive_mean),  # Target log CDF at lower bound.\n",
        "            tf.math.log1p(-anomaly_threshold / 2.) *\n",
        "            tf.ones_like(predictive_mean)  # Target log CDF at upper bound.\n",
        "        ],\n",
        "        axis=0)\n",
        "    limits, _, _ = tfp.math.find_root_chandrupatla(\n",
        "        objective_fn=lambda x: target_log_cdfs - predictive_dist.log_cdf(x),\n",
        "        low=tf.stack([\n",
        "            predictive_mean - 100 * predictive_stddev,\n",
        "            predictive_mean - 10 * predictive_stddev\n",
        "        ],\n",
        "                     axis=0),\n",
        "        high=tf.stack([\n",
        "            predictive_mean + 10 * predictive_stddev,\n",
        "            predictive_mean + 100 * predictive_stddev\n",
        "        ],\n",
        "                      axis=0))\n",
        "\n",
        "    # Identify anomalies.\n",
        "    anomalies = np.less(tail_probabilities, anomaly_threshold)\n",
        "    if mask is not None:\n",
        "      anomalies = np.logical_and(anomalies, ~mask)\n",
        "    observed_anomalies = list(\n",
        "        itertools.compress(range(len(times)), list(anomalies)))\n",
        "\n",
        "    times = times.strftime('%Y-%m-%d %H:%M:%S').tolist()\n",
        "    observed_series = observed_series.numpy().tolist()\n",
        "    predictive_mean = predictive_mean.numpy().tolist()\n",
        "    lower_limit = limits[0].numpy().tolist()\n",
        "    upper_limit = limits[1].numpy().tolist()\n",
        "    tail_probabilities = tail_probabilities.numpy().tolist()\n",
        "\n",
        "    return (times, times, observed_series, predictive_mean, lower_limit,\n",
        "            upper_limit, observed_anomalies, tail_probabilities)\n",
        "\n",
        "  data = load_data()\n",
        "  regularized_series = tfp.sts.regularize_series(data)\n",
        "\n",
        "  # TODO: Add seed to fit_surrogate_posterior.\n",
        "  model = default_model.build_default_model(regularized_series)\n",
        "  surrogate_posterior = tfp.sts.build_factored_surrogate_posterior(\n",
        "      model, seed=seed)\n",
        "  _ = tfp.vi.fit_surrogate_posterior(\n",
        "      target_log_prob_fn=model.joint_log_prob(regularized_series),\n",
        "      surrogate_posterior=surrogate_posterior,\n",
        "      optimizer=tf.optimizers.Adam(0.1),\n",
        "      num_steps=max_num_steps,\n",
        "      jit_compile=jit_compile,\n",
        "      convergence_criterion=(\n",
        "          tfp.optimizer.convergence_criteria.SuccessiveGradientsAreUncorrelated(\n",
        "              window_size=20, min_num_steps=50)))\n",
        "\n",
        "  posterior_samples = surrogate_posterior.sample(num_samples, seed=seed)\n",
        "  return detect_anomalies(regularized_series, model, posterior_samples,\n",
        "                          anomaly_threshold)"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "qFFIz_X0n7p2"
      },
      "source": [
        "### Execute the component remotely"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "ukyXFOA9Htga"
      },
      "source": [
        "packages = [\n",
        "    'numpy', 'pandas', 'tf-nightly',\n",
        "    'git+https://github.com/tensorflow/probability.git'\n",
        "]\n",
        "train_op = kfp.components.create_component_from_func_v2(\n",
        "    train,\n",
        "    packages_to_install=packages)"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "s1vqx-VGJrzT"
      },
      "source": [
        "@dsl.pipeline(\n",
        "    pipeline_root=PIPELINE_ROOT, name=PIPELINE_NAME)\n",
        "def pipeline() -> None:\n",
        "  \"\"\"\n",
        "      Train model and return detected anomalies.\n",
        "  \"\"\"\n",
        "\n",
        "  train_task = train_op()"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "dpZB1t1CJJCT"
      },
      "source": [
        "def run_pipeline(pipeline: Callable,\n",
        "                 parameter_values: Optional[Mapping[str, Any]] = {},\n",
        "                 enable_caching: bool = False) -> None:\n",
        "  \"\"\"Runs a given pipeline function using Kubeflow Pipelines.\n",
        "\n",
        "  Args:\n",
        "   pipeline: The function to run.\n",
        "   parameter_values: Parameters passed to the pipeline function when run.\n",
        "   enable_caching: Whether to used cached results from previous runs.\n",
        "  \"\"\"\n",
        "  compiler.Compiler().compile(\n",
        "    pipeline_func=pipeline,\n",
        "    package_path='{}_pipeline.json'.format(PIPELINE_NAME))\n",
        "\n",
        "  api_client = AIPlatformClient(\n",
        "    project_id=PROJECT_ID,\n",
        "    region=REGION,\n",
        "  )\n",
        "\n",
        "  _ = api_client.create_run_from_job_spec(\n",
        "    job_spec_path='{}_pipeline.json'.format(PIPELINE_NAME),\n",
        "    pipeline_root=PIPELINE_ROOT,\n",
        "    parameter_values=parameter_values,\n",
        "    enable_caching=enable_caching)"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "BLwijes3q1Qf"
      },
      "source": [
        "run_pipeline(pipeline)"
      ],
      "execution_count": null,
      "outputs": []
    }
  ]
}